[spark] Refactor BatchWrite subclasses into base logic + per-version wrappers#7723
[spark] Refactor BatchWrite subclasses into base logic + per-version wrappers#7723kerwin-zk wants to merge 1 commit intoapache:masterfrom
Conversation
b4569e5 to
d274193
Compare
There was a problem hiding this comment.
Pull request overview
Refactors Spark V2 BatchWrite implementations to avoid Spark 4.1’s inherited BatchWrite.commit(.., WriteSummary) signature from leaking into classes used on Spark 4.0 runtimes (which can trigger ClassNotFoundException: WriteSummary during task serialization). The change centralizes write business logic in Spark-version-agnostic base classes and moves the extends BatchWrite mixin into per-version thin wrappers constructed via SparkShim factories.
Changes:
- Introduce
PaimonBatchWriteBase/FormatTableBatchWriteBaseinpaimon-spark-common(do not extendBatchWrite) and add per-version wrapper classes that mix inBatchWrite. - Add
SparkShim.createPaimonBatchWriteandSparkShim.createFormatTableBatchWritefactories and route call sites throughSparkShimLoader. - Add Spark 4.0 shadow wrappers (and shim wiring) to ensure Spark 4.0-target-compiled class metadata is used at runtime.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala | Implements new SparkShim factories for Spark 4.x to construct version-compiled BatchWrite wrappers. |
| paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala | Spark 4.x thin BatchWrite wrapper delegating to PaimonBatchWriteBase. |
| paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala | Spark 4.x thin BatchWrite wrapper delegating to FormatTableBatchWriteBase. |
| paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala | Implements new SparkShim factories for Spark 3.x to construct version-compiled BatchWrite wrappers. |
| paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala | Spark 3.x thin BatchWrite wrapper delegating to PaimonBatchWriteBase. |
| paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala | Spark 3.x thin BatchWrite wrapper delegating to FormatTableBatchWriteBase. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala | Adds BatchWrite factory methods to route instantiation through per-version shims. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala | Switches V2 batch write construction to SparkShimLoader.shim.createPaimonBatchWrite. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala | Extracts BatchWrite logic into a Spark-version-agnostic base class (no extends BatchWrite). |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala | Switches FormatTable batch write construction to SparkShimLoader.shim.createFormatTableBatchWrite and removes the inline BatchWrite impl. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala | Extracts FormatTable BatchWrite logic into a base class (no extends BatchWrite). |
| paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala | Spark 4.0 shim override wiring the new factories to Spark-4.0-compiled wrappers. |
| paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala | Spark 4.0-compatible shadow BatchWrite wrapper delegating to PaimonBatchWriteBase. |
| paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala | Spark 4.0-compatible shadow BatchWrite wrapper delegating to FormatTableBatchWriteBase. |
Comments suppressed due to low confidence (2)
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala:74
batchWriteBuilder.newCommit()returns a commit object that should be closed. Here the commit is never closed (including on exceptions), which can leak resources/file handles. Wrap the commit in try/finally (or equivalent) and close it in the finally block while preserving the existing error logging.
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala:86abortMessagescreates a new commit viabatchWriteBuilder.newCommit()but never closes it. Close the commit in a finally block after aborting to avoid leaking resources.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Purpose
Follow-up of #7648 (Spark 4.1 module) and a sibling of #7721. After landing the reverse-shim layout, two of the files under
paimon-spark-4.0/src/mainonly existed as shadows because their compilation unit defined a Scala class thatextends BatchWrite. Spark 4.1 added a default methodBatchWrite.commit(WriterCommitMessage[], WriteSummary)whoseWriteSummaryparameter type does not exist on Spark 4.0; a class compiled against 4.1 that mixes inBatchWritecarries the inheritedcommit(.., WriteSummary)signature in its method table, which JVMObjectStreamClass.getPrivateMethodlazy-links during Spark task serialization and crashes 4.0 withClassNotFoundException: WriteSummary.This PR refactors both affected classes into the same base + per-version wrapper pattern:
PaimonBatchWrite(used by V2 writes)FormatTableBatchWrite(used byFormatTableV2 writes — was previously aprivate case classinsidePaimonFormatTable.scala)For each, the body lives in a new abstract base in
paimon-spark-commonthat deliberately does not extendBatchWrite(renamed protected helpers:commitMessages,abortMessages,createPaimonDataWriterFactory,createFormatTableDataWriterFactory). Each per-version module (paimon-spark3-common,paimon-spark4-common,paimon-spark-4.0/src/main) ships a thin wrapper that mixes inBatchWriteand forwards the fourBatchWritemethods to the base helpers. Routing happens through two newSparkShimfactories so each Spark version's scalac compiles the rightextends BatchWritemixin.The Spark 4.0 shadow of
PaimonFormatTable.scalais no longer needed and is deleted; only the new thinFormatTableBatchWrite.scalawrapper remains underpaimon-spark-4.0/src/main.Tests
CI
API and Format
No new public API. Two internal factories added to
org.apache.spark.sql.paimon.shims.SparkShim:createPaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan)createFormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema)Documentation
No user-facing changes.